Source code for hysop.tools.misc
# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Some utilities to deal with workspaces, slices and other python objects
* :class:`~hysop.tools.misc.Utils` : some utilities to handle slices in hysop.
* :class:`~hysop.tools.misc.WorkSpaceTools` to deal with operators' local
work spaces, see :ref:`work_spaces`.
"""
import inspect
import functools
import operator
import numpy as np
from hysop.constants import HYSOP_REAL, HYSOP_INTEGER
from hysop.tools.numpywrappers import npw
[docs]
def getargspec(func):
spec = inspect.getfullargspec(func)
return (spec.args, spec.varargs, spec.varkw, spec.defaults)
[docs]
def prod(values):
"""
Like sum but for products (of integers).
"""
try:
return np.prod(values, dtype=np.int64)
except:
return np.prod(values)
[docs]
def compute_nbytes(shape, dtype):
from hysop.tools.numerics import get_itemsize
nbytes = prod(shape) * get_itemsize(dtype)
assert nbytes > 0
return nbytes
[docs]
def get_default_args(func):
"""
returns a dictionary of arg_name:default_values for the input function.
"""
args, varargs, keywords, defaults = getargspec(func)
if defaults is None:
return dict()
else:
return dict(zip(args[-len(defaults) :], defaults))
[docs]
def get_argnames(func):
"""
returns arguments name and possible varargs.
"""
argnames, varargs, _, _ = getargspec(func)
return argnames, varargs
[docs]
def args2kargs(func, args):
argnames, _, _, _ = getargspec(func)
return dict(zip(argnames, args))
[docs]
def kargs2args(func, kargs, remove=[]):
argnames, _, _, _ = getargspec(func)
return tuple(kargs[a] for a in argnames if a not in remove)
[docs]
def upper_pow2(x):
def _upper_pow2(x):
if x < 0:
raise RuntimeError("x<0")
i = 0
k = 2**i
while k < x:
i += 1
k *= 2
return k
if np.isscalar(x):
return _upper_pow2(x)
elif isinstance(x, np.ndarray):
return np.asarray([_upper_pow2(_x) for _x in x], dtype=x.dtype)
else:
return type(x)(_upper_pow2(_x) for _x in x)
[docs]
def next_pow2(x):
def _next_pow2(x):
if x <= 0:
return 1
y = upper_pow2(x)
if x == y:
y = upper_pow2(x + 1)
return y
if np.isscalar(x):
return _next_pow2(x)
elif isinstance(x, np.ndarray):
return np.asarray([_next_pow2(_x) for _x in x], dtype=x.dtype)
else:
return type(x)(_next_pow2(_x) for _x in x)
[docs]
def previous_pow2(x):
def _previous_pow2(x):
assert x >= 1
y = upper_pow2(x) // 2
if x == y:
y = upper_pow2(x - 1) // 2
return y
if np.isscalar(x):
return _previous_pow2(x)
elif isinstance(x, np.ndarray):
return np.asarray([_previous_pow2(_x) for _x in x], dtype=x.dtype)
else:
return type(x)(_previous_pow2(_x) for _x in x)
[docs]
def upper_pow2_or_3(x):
if np.isscalar(x):
y = x if x == 3 else upper_pow2(x)
else:
y = upper_pow2(x)
y[x == 3] = 3
return y
[docs]
class Utils:
"""tools to handle array and slices."""
"""
Perform an indirect sort of seq using python default sorting algorithm.
It returns an array of indices of the same length as input seq.
"""
[docs]
@staticmethod
def upper_pow2(x):
def _upper_pow2(x):
if x < 0:
raise RuntimeError("x<0")
i = 0
k = 2**i
while k < x:
i += 1
k *= 2
return k
if np.isscalar(x):
return _upper_pow2(x)
else:
return np.asarray([_upper_pow2(_x) for _x in x])
[docs]
@staticmethod
def array_to_dict(inarray):
"""
convert an array into a dictionnary,
keys being the column numbers in array
and values the content of each corresponding column
transformed into a list of slices like this:
column = [1, 4, 2, 6, ...] ---> [slice(1, 4), slice(2, 6), ...]
"""
outslice = {}
size = inarray.shape[1]
dimension = (int)(0.5 * inarray.shape[0])
for rk in range(size):
outslice[rk] = [
slice(inarray[2 * d, rk], inarray[2 * d + 1, rk] + 1)
for d in range(dimension)
]
return outslice
[docs]
@staticmethod
def intersect_slices(sl1, sl2):
"""Intersection of two lists of slices
Parameters
-----------
sl1 : a list of slices
sl2 : a list of slices
Return :
guess what ... a list of slices such that:
result[i] = intersection between sl1[i] and sl2[i]
"""
assert len(sl1) == len(sl2)
res = [None] * len(sl1)
for d in range(len(sl1)):
s1 = sl1[d]
s2 = sl2[d]
if s1.step != s2.step:
raise NotImplementedError(
"Multi-step intersection has not been implemented yet."
)
start = max(s1.start, s2.start)
stop = min(s1.stop, s2.stop)
step = s1.step
if stop <= start:
return None
res[d] = slice(start, stop, step)
return tuple(res)
[docs]
@staticmethod
def is_on_proc(sl):
"""True if sl represent a non empty set of indices."""
return (np.asarray([ind.stop != ind.start for ind in sl])).all()
[docs]
class WorkSpaceTools:
"""Tools to deal with internal work arrays for operators"""
[docs]
@staticmethod
def check_work_array(lwork, subshape, work=None, data_type=HYSOP_REAL):
"""Check properties of existing working array or
allocate some new buffers complient with some properties.
Parameters
----------
lwork : int
required number of working arrays
subshape : list of tuples of int
required shape for work array
subshape[i] == expected shape for work[i]
work : list of numpy arrays
data_type : either HYSOP_REAL or HYSOP_INTEGER
Notes
-----
work arrays are 1D arrays of size prod(subshape) that are 'reshaped'
according to the specific needs of each operator. That means
that one memory location (the 1D array) may be shared
between several operators thanks to the numpy reshape function.
"""
from hysop.tools.numpywrappers import npw
result = []
if isinstance(subshape, list):
subsize = [prod(subshape[i]) for i in range(len(subshape))]
else:
subsize = [
prod(subshape),
] * lwork
subshape = [
subshape,
] * lwork
if work is None:
for i in range(lwork):
result.append(
npw.zeros(subsize[i], dtype=data_type).reshape(subshape[i])
)
else:
assert isinstance(work, list), "rwork must be a list."
msg = "Work arrays list must be at least of size "
msg += str(lwork) + "."
assert len(work) >= lwork, msg
msg1 = "Work array size is too small."
msg2 = "Work array must be a flat array (1D)."
try:
for i in range(lwork):
wk = work[i]
assert wk.size >= subsize[i], msg1
assert len(np.where(np.asarray(wk.shape) > 1)[0]) == 1, msg2
result.append(wk.ravel()[: subsize[i]].reshape(subshape[i]))
for i in range(len(result)):
assert npw.arrays_share_data(result[i], work[i])
except AttributeError:
# Work array has been replaced by an OpenCL Buffer
# Testing the buffer size instead of shape
for i in range(lwork):
wk = work[i]
s = wk.size // subsize[i]
WorkSpaceTools._check_ocl_buffer(s, data_type)
result = work
return result
@staticmethod
def _check_ocl_buffer(s, dtype):
"""check if opencl buffer size is complient with input type."""
if dtype is HYSOP_REAL:
assert (HYSOP_REAL is np.float32 and s == 4) or (
HYSOP_REAL is np.float64 and s == 8
)
elif dtype is HYSOP_INTEGER:
assert (
(HYSOP_INTEGER is np.int16 and s == 2)
or (HYSOP_INTEGER is np.int32 and s == 4)
or (HYSOP_INTEGER is np.int64 and s == 8)
)
[docs]
@staticmethod
def find_common_workspace(operators, array_type="rwork"):
from hysop.tools.numpywrappers import npw
"""Allocate a list of common workspaces able to work
with some given operators
Parameters
----------
operators : a list or a dictionnary of operators
Each component of the list or value of the dictionnary
must be an object that may need some internal workspaces
and with a 'get_work_properties' function, hysop operators indeed.
array_type : string, optional
between 'rwork' (real arrays) or 'iwork' (integer arrays),
depending on the required work.
Default is 'rwork'.
Returns
-------
work : list of numpy arrays
workspaces common to all operators.
Example
-------
# op1, op2 some predifined operators
op1.discretize()
op2.discretize()
rwork = Utils.find_common_workspace([op1, op2])
iwork = Utils.find_common_workspace([op1, op2], 'iwork')
op1.setup(rwork=work, iwork=iwork)
op2.setup(rwork=work)
# work is a common internal list of workspaces that can be used by both
# operators.
"""
properties = []
if isinstance(operators, dict):
oplist = operators.values()
elif isinstance(operators, list):
oplist = operators
else:
raise AttributeError("operators must be a list or a dict")
for op in oplist:
properties.append(op.get_work_properties()[array_type])
return WorkSpaceTools.allocate_common_workspaces(properties)
[docs]
@staticmethod
def allocate_common_workspaces(work_properties):
"""Allocate a list of common workspaces according to
some given properties.
Parameters
----------
work_properties : list of lists of tuples
properties of workspaces. Each component of this list
must be the return value of a get_work_properties function
(for instance from an operator)
Returns
-------
work : list of numpy arrays
workspaces fitting with properties requirements.
Example
-------
# op1, op2 some predifined operators
op1.discretize()
op2.discretize()
wk_prop = []
wk_prop.append(op1.get_work_properties()['rwork'])
wk_prop.append(op2.get_work_properties()['rwork'])
work = Utils.find_common_workspace(wk_prop)
op1.setup(rwork=work)
op2.setup(rwork=work)
# work is a common internal list of workspaces that can be used by both
# operators.
"""
assert isinstance(work_properties, list)
# Max number of required working arrays:
properties = [p for p in work_properties if p is not None]
lwork = max(len(prop) for prop in properties)
# Then find the max required workspace for work array
shapes = [
(0,),
] * lwork
for prop in properties:
lp = len(prop)
for i in range(lp):
shapes[i] = tuple(np.maximum(shapes[i], prod(prop[i])))
work = [npw.zeros(shape) for shape in shapes]
return work